package com.atguigu.flink.state.keyedstate;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
 * Created by Smexy on 2023/11/17

 示例: 计算每种传感器的水位和
 */
public class Demo4_ReducingState
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                   //按照id分组
                    .keyBy(WaterSensor::getId)
                    .process(new KeyedProcessFunction<String, WaterSensor, String>()
                    {


                        private ReducingState<Integer> sumVc;

                        //在Task被创建后，从备份中恢复
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            sumVc = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("sumVc", new ReduceFunction<Integer>()
                            {
                                @Override
                                public Integer reduce(Integer value1, Integer value2) throws Exception {
                                    return value1 + value2;
                                }
                            }, Integer.class));
                        }

                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {

                            //加入到状态中，就可以自动聚合
                            sumVc.add(value.getVc());

                            out.collect(ctx.getCurrentKey() +"sumVc:"+sumVc.get());


                        }
                    })
                    .print();
        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
